# Building and Deploying our ML Model

**SageMaker Studio Kernel**: Data Science

In this exercise you will do:
 - Create/Run a Model Building Pipeline using Pytorch and [SageMaker Pipelines](https://docs.aws.amazon.com/sagemaker/latest/dg/pipelines.html)
 - Compute the thresholds, used by the applicatio to classify the predictions as anomalies or normal behavior
 - Compile/Optimize your model to your edge device (Linux X86_64) using [SageMaker NEO](https://docs.aws.amazon.com/sagemaker/latest/dg/neo.html)
 - Create a deployment package with a signed model + the runtime used by SageMaker Edge Agent to load and invoke the optimized model
 - Deploy the package using IoT Jobs

The following diagram shows all the steps we're going to execute: 
![Pipeline](../imgs/EdgeManagerWorkshop_ModelPipeline.png)

## Part 1/4 - Setup
Here we'll import some libraries and define some variables. You can also take a look on the scripts that were previously created for preparing the data and training our model.

In [None]:
import sagemaker
import numpy as np
import sagemaker
import glob
import os
import boto3

In [None]:
project_name='<>'

s3_client = boto3.client('s3')
sm_client = boto3.client('sagemaker')

project_id = sm_client.describe_project(ProjectName=project_name)['ProjectId']
bucket_name = 'sagemaker-wind-turbine-farm-%s' % project_id

prefix='wind_turbine_anomaly'
sagemaker_session=sagemaker.Session(default_bucket=bucket_name)
role = sagemaker.get_execution_role()
print('Project name: %s' % project_name)
print('Project id: %s' % project_id)

### Get the dataset and upload it to an S3 bucket
This bucket will be the input path of the data prep step of our ML Pipeline

In [None]:
# Download the 
!mkdir -p data
!curl https://aws-ml-blog.s3.amazonaws.com/artifacts/monitor-manage-anomaly-detection-model-wind-turbine-fleet-sagemaker-neo/dataset_wind_turbine.csv.gz -o data/dataset_wind.csv.gz
# clean the buckets first
s3_client.delete_object(Bucket=bucket_name, Key='%s/data/' % prefix)
s3_client.delete_object(Bucket=bucket_name, Key='wind_turbine_anomaly/output')

input_data = sagemaker_session.upload_data('data/dataset_wind.csv.gz', key_prefix="%s/data" % prefix )
print(input_data)

### Visualize the training script & the preprocessing script

In [None]:
## This script was created to express what we saw in the previous exercise.
## It will get the raw data from the turbine sensors, select some features, 
## denoise, normalize, encode and reshape it as a 6x10x10 tensor
## This script is the entrypoint of the first step of the ML Pipelie: Data preparation
!pygmentize preprocessing.py

In [None]:
## This is the training/prediction script, used by the training step of 
## our ML Pipeline. In this step, a SageMaker Training Job will run this 
## script to build the model. Then, in the batch transform step,
## the same script will be used again to load the trained model
## and rebuild (predict) all the training samples. These predictions
## will then be used to compute MAE and the thresholds, for detecting anomalies
!pygmentize wind_turbine.py

## Creating our ML Pipeline

### Input parameters of the pipeline
These input parameters can be overriden later if you want. The final pipeline is like a function f(x), where you reuse many times to train/retrain your model.

In [None]:
import time
from sagemaker.workflow.steps import CacheConfig
from sagemaker.workflow.parameters import (
 ParameterInteger,
 ParameterString,
)
## By enabling cache, if you run this pipeline again, without changing the input 
## parameters it will skip the training part and reuse the previous trained model
cache_config = CacheConfig(enable_caching=True, expire_after="30d")
ts = time.strftime('%Y-%m-%d-%H-%M-%S')

# Data prep
processing_instance_type = ParameterString( # instance type for data preparation
 name="ProcessingInstanceType",
 default_value="ml.m5.xlarge"
)
processing_instance_count = ParameterInteger( # number of instances used for data preparation
 name="ProcessingInstanceCount",
 default_value=1
)

# Training
training_instance_type = ParameterString( # instance type for training the model
 name="TrainingInstanceType",
 default_value="ml.g4dn.xlarge"
)
training_instance_count = ParameterInteger( # number of instances used to train your model
 name="TrainingInstanceCount",
 default_value=1 # wind_turbine.py supports only 1 instance
)

# Batch prediction
transform_instance_type = ParameterString( # instance type for batch transform jobs
 name="TransformInstanceType",
 default_value="ml.c5.xlarge"
)
transform_instance_count = ParameterInteger( # number of instances used for batch prediction
 name="TransformInstanceCount",
 default_value=2
)

# Dataset input data: S3 path
input_data = ParameterString(
 name="InputData",
 default_value=input_data,
)

# Batch prediction output: S3 path
output_batch_data = ParameterString(
 name="OutputBatchData",
 default_value="s3://%s/%s/output" % (bucket_name, prefix),
)

## Defining the steps of our pipeline

### Step 1/5 - Preprocess the raw data to clean, denoise and normalize

In [None]:
import boto3
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep

sts_client = boto3.client('sts')
account_id = sts_client.get_caller_identity().get('Account')
region = boto3.session.Session().region_name

script_processor = SKLearnProcessor(
 framework_version="0.23-1",
 role=role,
 instance_type=processing_instance_type,
 instance_count=processing_instance_count,
 max_runtime_in_seconds=7200,
)

step_process = ProcessingStep(
 name="WindTurbineDataPreprocess",
 code='preprocessing.py', ## this is the script defined above
 processor=script_processor,
 inputs=[
 ProcessingInput(source=input_data, destination='/opt/ml/processing/input')
 ],
 outputs=[
 ProcessingOutput(output_name='train_data', source='/opt/ml/processing/train'),
 ProcessingOutput(output_name='statistics', source='/opt/ml/processing/statistics')
 ],
 job_arguments=['--num-dataset-splits', '20']
)

### Step 2/5 - Training with a Pytorch Estimator

#### First we create the SageMaker Estimator

In [None]:
from sagemaker.pytorch.estimator import PyTorch
framework_version='1.6.0'
py_version='py3'
estimator = PyTorch(
 'wind_turbine.py', ## This is the script (wind_turbine.py) defined above
 framework_version=framework_version,
 role=role,
 sagemaker_session=sagemaker_session,
 instance_type=training_instance_type,
 instance_count=training_instance_count,
 py_version=py_version, 
 hyperparameters={
 'k_fold_splits': 6,
 'k_index_only': 3, # after running some experiments with this dataset, it makes sense to fix it
 'num_epochs': 200,
 'batch_size': 256,
 'learning_rate': 0.0001,
 'dropout_rate': 0.001
 },
 metric_definitions=[
 {'Name': 'train_loss:mse', 'Regex': ' train_loss=(\S+);'},
 {'Name': 'test_loss:mse', 'Regex': ' test_loss=(\S+);'}
 ]
)

#### Then, we define the training step

In [None]:
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TrainingStep

step_train = TrainingStep(
 name="WindTurbineAnomalyTrain",
 estimator=estimator,
 inputs={"train": TrainingInput(
 s3_data=step_process.properties.ProcessingOutputConfig.Outputs["train_data"].S3Output.S3Uri,
 content_type="application/x-npy"
 )},
 cache_config=cache_config
)

### Step 4/5 - Create a new model in the SageMaker Models Catalog
This step will transform the results of your Training Job into a real Model. After that, you'll be able to deploy and invoke your model

In [None]:
from sagemaker.workflow.steps import CreateModelStep
from sagemaker.model import Model
from sagemaker.inputs import CreateModelInput

model = Model(
 image_uri=sagemaker.image_uris.retrieve(
 framework="pytorch", # we are using the SageMaker pre-built PyTorch inference image
 region=sagemaker_session.boto_session.region_name,
 version=framework_version,
 py_version=py_version,
 instance_type=training_instance_type,
 image_scope='inference'
 ),
 model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
 sagemaker_session=sagemaker_session,
 role=role
)
step_create_model = CreateModelStep(
 name="WindTurbineAnomalyCreateModel",
 model=model,
 inputs=CreateModelInput(
 instance_type=transform_instance_type
 )
)

### Step 5/5 - Run a batch transform job to get all the predictions
The predictions will then be used to compute the Thresholds. Only the training samples will be used in this step

In [None]:
from sagemaker.workflow.steps import TransformStep
from sagemaker.inputs import TransformInput
from sagemaker.transformer import Transformer

step_transform = TransformStep(
 name="WindTurbineAnomalyTransform",
 transformer=Transformer(
 model_name=step_create_model.properties.ModelName,
 instance_type=transform_instance_type,
 instance_count=transform_instance_count,
 output_path=output_batch_data,
 accept='application/x-npy',
 max_payload=20,
 strategy='MultiRecord',
 assemble_with='Line'
 ),
 inputs=TransformInput(data=step_process.properties.ProcessingOutputConfig.Outputs["train_data"].S3Output.S3Uri, content_type="application/x-npy")
)

### Now that we have created all the requireds steps, its time to create our pipeline
This code will create a physical (with ARN) resource that will execute all the steps defined abov

In [None]:
from botocore.exceptions import ClientError, ValidationError
from sagemaker.workflow.pipeline import Pipeline

# NOTE:
# condition steps have issues in service so we go straight to step_register
pipeline_name = "WindTurbineAnomalyTrain-%s" % ts
pipeline = Pipeline(
 name=pipeline_name,
 parameters=[
 processing_instance_type,
 processing_instance_count, 
 training_instance_type,
 training_instance_count, 
 transform_instance_type,
 transform_instance_count, 
 input_data,
 output_batch_data
 ],
 steps=[step_process, step_train, step_create_model, step_transform],
 sagemaker_session=sagemaker_session,
)

try:
 response = pipeline.create(role_arn=role)
except ClientError as e:
 error = e.response["Error"]
 if error["Code"] == "ValidationError" and "Pipeline names must be unique within" in error["Message"]:
 print(error["Message"])
 response = pipeline.describe()
 else:
 raise

 ## The following code put some tags that will be tracked by SageMaker Studio
pipeline_arn = response["PipelineArn"]
sm_client = boto3.client('sagemaker')
sm_client.add_tags(
 ResourceArn=pipeline_arn,
 Tags=[
 {'Key': 'sagemaker:project-name', 'Value': project_name },
 {'Key': 'sagemaker:project-id', 'Value': project_id }
 ]
)
print(pipeline_arn)

## Part 2/4 - Now its time to execute our pipeline. After kicking-off the pipeline, you can open SageMaker Studio and go to your project -> Pipelines to see execution
It takes ~17mins to complete the whole pipeline

In [None]:
%%time
start_response = pipeline.start(parameters={
 # 'TrainingInstanceType': 'ml.c5.xlarge', # uncomment this line if your account doesn't support g4 instances. It will take 20x more to finish.
 'TransformInstanceType': 'ml.c5.xlarge'
})

pipeline_execution_arn = start_response.arn
print(pipeline_execution_arn)

while True:
 resp = sm_client.describe_pipeline_execution(PipelineExecutionArn=pipeline_execution_arn)
 if resp['PipelineExecutionStatus'] == 'Executing':
 print('Running...')
 else:
 print(resp['PipelineExecutionStatus'], pipeline_execution_arn)
 break
 time.sleep(30)

## Part 3/4 - Compute the threshold based on MAE

### Download the predictions & Compute MAE/thresholds

In [None]:
sagemaker_session.download_data(bucket=bucket_name, key_prefix='wind_turbine_anomaly/output/', path='data/preds/')

In [None]:
import boto3
import sagemaker

sm_client = boto3.client('sagemaker')
sagemaker_session = sagemaker.Session()
role = sagemaker.get_execution_role()

execution_id = pipeline_execution_arn.split('/')[-1]
training_jobs = sm_client.list_training_jobs(NameContains=execution_id, StatusEquals='Completed')['TrainingJobSummaries']

assert(len(training_jobs) == 1) # it must have exactly one training job
training_job_name=training_jobs[0]['TrainingJobName']

# We will recreate the estimator, based on the training job
estimator = sagemaker.estimator.Estimator.attach(
 training_job_name=training_job_name, 
 sagemaker_session=sagemaker_session
)
input_data = sm_client.describe_training_job(TrainingJobName=training_job_name)
input_data = input_data['InputDataConfig'][0]['DataSource']['S3DataSource']['S3Uri']

tokens = input_data.split('/', 3)
sagemaker_session.download_data(bucket=tokens[2], key_prefix=tokens[3], path='data/input/')

In [None]:
import numpy as np
import glob

x_inputs = np.vstack([np.load(i) for i in glob.glob('data/input/*.npy')])
y_preds = np.vstack([np.load(i) for i in glob.glob('data/preds/*.out')])

n_samples,n_features,n_rows,n_cols = x_inputs.shape

x_inputs = x_inputs.reshape(n_samples, n_features, n_rows*n_cols).transpose((0,2,1))
y_preds = y_preds.reshape(n_samples, n_features, n_rows*n_cols).transpose((0,2,1))

mae_loss = np.mean(np.abs(y_preds - x_inputs), axis=1).transpose((1,0))
mae_loss[np.isnan(mae_loss)] = 0

thresholds = np.mean(mae_loss, axis=1)
np.save('statistics/thresholds.npy', thresholds)
print(",".join(thresholds.astype(str)))

### Part 4/4 - Compiling/Packaging/Deploying our ML model to our edge devices

### Invoking SageMaker NEO to compile the trained model

In [None]:
compilation_job_name = 'wind-turbine-anomaly-%d' % int(time.time()*1000)
sm_client.create_compilation_job(
 CompilationJobName=compilation_job_name,
 RoleArn=role,
 InputConfig={
 'S3Uri': '%s%s/output/model.tar.gz' % (estimator.output_path, training_job_name),
 'DataInputConfig': '{"input0":[1,%d,10,10]}' % n_features,
 'Framework': 'PYTORCH'
 },
 OutputConfig={
 'S3OutputLocation': 's3://%s/wind_turbine/optimized/' % sagemaker_session.default_bucket(), 
 'TargetPlatform': { 'Os': 'LINUX', 'Arch': 'X86_64' }
 },
 StoppingCondition={ 'MaxRuntimeInSeconds': 900 }
)
while True:
 resp = sm_client.describe_compilation_job(CompilationJobName=compilation_job_name) 
 if resp['CompilationJobStatus'] in ['STARTING', 'INPROGRESS']:
 print('Running...')
 else:
 print(resp['CompilationJobStatus'], compilation_job_name)
 break
 time.sleep(5)

### Building the Deployment Package SageMaker Edge Manager
It will sign the model and create a deployment package with:
 - The optimized model
 - Model Metadata
 - SageMaker NEO runtime (dlr)

In [None]:
import time
model_version = '1.0'
model_name = 'WindTurbineAnomalyDetection'
edge_packaging_job_name='wind-turbine-anomaly-%d' % int(time.time()*1000)
resp = sm_client.create_edge_packaging_job(
 EdgePackagingJobName=edge_packaging_job_name,
 CompilationJobName=compilation_job_name,
 ModelName=model_name,
 ModelVersion=model_version,
 RoleArn=role,
 OutputConfig={
 'S3OutputLocation': 's3://%s/%s/model/' % (bucket_name, prefix)
 }
)
while True:
 resp = sm_client.describe_edge_packaging_job(EdgePackagingJobName=edge_packaging_job_name) 
 if resp['EdgePackagingJobStatus'] in ['STARTING', 'INPROGRESS']:
 print('Running...')
 else:
 print(resp['EdgePackagingJobStatus'], compilation_job_name)
 break
 time.sleep(5)

### Deploy the package
Using IoT Jobs, we will notify the Python application in the edge devices. The application will:
 - download the deployment package
 - unpack it
 - load the new mode (unload previous versions if any)

In [None]:
import boto3
import json
import sagemaker
import uuid

iot_client = boto3.client('iot')
sts_client = boto3.client('sts')

model_version = '1.0'
model_name = 'WindTurbineAnomalyDetection'
sagemaker_session=sagemaker.Session()
region_name = sagemaker_session.boto_session.region_name
account_id = sts_client.get_caller_identity()["Account"]

In [None]:
resp = iot_client.create_job(
 jobId=str(uuid.uuid4()),
 targets=[
 'arn:aws:iot:%s:%s:thinggroup/WindTurbineFarm-%s' % (region_name, account_id, project_id), 
 ],
 document=json.dumps({
 'type': 'new_model',
 'model_version': model_version,
 'model_name': model_name,
 'model_package_bucket': bucket_name,
 'model_package_key': "%s/model/%s-%s.tar.gz" % (prefix, model_name, model_version) 
 }),
 targetSelection='SNAPSHOT'
)

Alright! Now, the deployment process will start on the connected edge devices! 
You can start the Exercise #3: Run a simulated fleet of wind turbines and edge devices. Predict anomalies
 
 > [Exercise 03](03%20-%20Run%20Fleet.ipynb)
